-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deregister pipeline loader callback when inputsRunner is stopped #7893
Deregister pipeline loader callback when inputsRunner is stopped #7893
Conversation
filebeat/fileset/factory.go
Outdated
@@ -55,6 +56,7 @@ type inputsRunner struct { | |||
moduleRegistry *ModuleRegistry | |||
inputs []*input.Runner | |||
pipelineLoaderFactory PipelineLoaderFactory | |||
pipelineCallbackId int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct field pipelineCallbackId should be pipelineCallbackID
filebeat/fileset/factory.go
Outdated
@@ -46,6 +46,7 @@ type Factory struct { | |||
beatVersion string | |||
pipelineLoaderFactory PipelineLoaderFactory | |||
overwritePipelines bool | |||
pipelineCallbackId int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct field pipelineCallbackId should be pipelineCallbackID
cbcc5db
to
d566eec
Compare
The failing tests are unrelated. |
@@ -130,7 +134,7 @@ func (p *inputsRunner) Start() { | |||
callback := func(esClient *elasticsearch.Client) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the code on line 124 and 129: In case of an error, should we register a callback at all? Not introduced in this PR but probably something we should clean up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code is correct. The lines you are referring to are trying to load pipelines immediately in Start
. It's possible it fails due to e.g ES is not being not available.
But it's the whole point of using OnConnect callbacks. If previously pipeline loading fails, it can be attempted again when connecting to ES again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kvch I got tricked by this as well. Can you add a comment why the callback must always be registered:
- if pipeline loading fails, retry on reload
- always retry on reload in case of someone removing it by accident or new ES cluster in place
@@ -50,19 +50,40 @@ var ( | |||
) | |||
|
|||
type callbacksRegistry struct { | |||
callbacks []connectCallback | |||
callbacks map[int]connectCallback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using a uuid here instead of an int as identifier? Like this the incrementing and storing of the current key would not be needed.
@@ -50,19 +52,38 @@ var ( | |||
) | |||
|
|||
type callbacksRegistry struct { | |||
callbacks []connectCallback | |||
callbacks map[uuid.UUID]connectCallback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes order in which callbacks will be called. I don't think we don't have any ordering requirements yet, but it's something we need to be aware of in the future: Callbacks must not depend on results of a former callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a comment, so it's less likely we forget it.
|
||
key := uuid.NewV4() | ||
connectCallbackRegistry.callbacks[key] = callback | ||
return key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: while it's very very unlikely to have collissions, we might account for the chance of having collisions by checking the key does not exist.
6530daf
to
a48c6bb
Compare
WFG |
…stic#7893) (cherry picked from commit 65ef265)
…stic#7893) (cherry picked from commit 65ef265)
(cherry picked from commit 65ef265)
…stic#7893) (elastic#7912) (cherry picked from commit 9b06b40)
From now on
connectCallback
s of Elasticsearch can be deregistered.Pipeline loader callbacks are deregistered when
inputsRunner
is stopped, so unnecessary callback calls are eliminated after reload and during autodiscovery. This also lets the GC to collect leftover states e.g.moduleRegistry
.Closes #7891